package kafka_go

import (
	"context"
	"errors"
	sarama "gitee.com/tym_hmm/go-kafa-shopify-sarama"
	"os"
	"os/signal"
	"strings"
	"sync"
	"syscall"
	"time"
)

/**
消费者构建
*/
type factoryConsumer struct {
	//是否后台运行
	isBackGround bool
	//多个消费者注册
	consumerBuilds []BuildConsumerApi
}

func NewFactoryConsumer() *factoryConsumer {
	return &factoryConsumer{isBackGround: false}
}

func NewFactoryConsumerBackGround() *factoryConsumer {
	return &factoryConsumer{isBackGround: true}
}

//注册消费者
func (this *factoryConsumer) RegisterConsumer(consumerBuild ...BuildConsumerApi) *factoryConsumer {
	if this.consumerBuilds == nil {
		this.consumerBuilds = make([]BuildConsumerApi, 0)
	}
	if consumerBuild == nil || len(consumerBuild) == 0 {
		return this
	}
	buildLen := len(consumerBuild)
	this.consumerBuilds = make([]BuildConsumerApi, buildLen)
	copy(this.consumerBuilds, consumerBuild)
	return this
}

/**
执行消费
*/
func (this *factoryConsumer) Run() error {
	return this._consumerTask()
}

//消费任务
func (this *factoryConsumer) _consumerTask() error {
	if this.consumerBuilds == nil || len(this.consumerBuilds) == 0 {
		return KAFKA_CONSUMER_ERROR_BUILD_EMPTY
	}
	if this.isBackGround {
		return this._createConsumers()
	} else {
		return this._createConsumersWait()
	}

}

//创建指定消费任务
func (this *factoryConsumer) _createConsumers() error {

	var errSlice = []error{}
	for k, v := range this.consumerBuilds {
		_k := k
		_v := v
		go func(num int, factoryConsumer *factoryConsumer, consumer BuildConsumerApi) {
			log.InfoNoFileInfo("create kafka consumer, index:%d, groupId:%s, topic:%s", num, consumer.GetGroupId(), consumer.GetTopic())
			err := _consumerPartition(num, factoryConsumer, consumer)
			if err != nil {
				errSlice = append(errSlice, err)
			}

		}(_k, this, _v)
	}
	if len(errSlice) != 0 {
		return errors.New("consumer start error")
	}
	return nil
}

func (this *factoryConsumer) _createConsumersWait() error {
	var wg = &sync.WaitGroup{}
	var errSlice = []error{}
	for k, v := range this.consumerBuilds {
		_k := k
		_v := v
		wg.Add(1)
		go func(num int, factoryConsumer *factoryConsumer, consumer BuildConsumerApi) {
			defer wg.Done()
			log.InfoNoFileInfo("create kafka consumer wait, index:%d, groupId:%s, topic:%s", num, consumer.GetGroupId(), consumer.GetTopic())
			err := _consumerPartition(num, factoryConsumer, consumer)
			if err != nil {
				errSlice = append(errSlice, err)
			}
		}(_k, this, _v)
	}
	wg.Wait()
	if len(errSlice) != 0 {
		return errors.New("consumer start error")
	}
	return nil
}

/**
多分区消费处理
*/
func _consumerPartition(num int, consumer *factoryConsumer, buildConsumer BuildConsumerApi) error {
	//如果是多分区，只能使用非消费组方式
	if buildConsumer.IsMultiplePartition() {
		return _consumerItem(num, consumer, buildConsumer)
	} else {
		return _consumerItemGroup(num, consumer, buildConsumer)
	}

}

func _consumerItem(num int, factoryConsumer *factoryConsumer, buildConsumer BuildConsumerApi) error {
	broker := strings.Split(buildConsumer.GetAddr(), ",")
	topic := buildConsumer.GetTopic()
	conf := buildConsumer.GetConfig()
	responseListen := buildConsumer.GetResponseListener()

	//conf.Consumer.Group.InstanceId = groupId
	log.InfoNoFileInfo("starting consumer [index:%d, topics:%s]", num, topic)
	ccClient, err := sarama.NewConsumer(broker, conf)
	if err != nil {
		log.ErrorNoFileInfo("can not be connection server :%s", err.Error())
		return err
	}

	//获取所有分区
	log.InfoNoFileInfo("load topic %s partition", topic)
	partitionList, err := ccClient.Partitions(topic)
	if err != nil {
		log.ErrorNoFileInfo("load topic:%s partitions error :%s", topic, err.Error())
		//log.Panicf("load topic:%s partitions error :%s", topic)
		return err
	}
	if len(partitionList) == 0 {
		log.ErrorNoFileInfo("topic %s partitions size 0", topic)
		//log.Panicf("topic %s partitions size 0", topic)
		return errors.New("topic partitions size 0")
	}
	var wg = &sync.WaitGroup{}

	for _, v := range partitionList {
		var _c = newConsumer()
		_c.SetListener(responseListen)

		_c.SetGroupId("")
		_c.SetBuilder(buildConsumer)
		pc, er := ccClient.ConsumePartition(topic, v, sarama.OffsetNewest)
		if err != nil {
			log.ErrorNoFileInfo("fail consumer topic %s partition %d, error:%s", topic, v, er.Error())
			break
		}

		//defer pc.AsyncClose()
		wg.Add(1)
		go func(pClient sarama.PartitionConsumer, t string, p int32, c *consumer, b BuildConsumerApi) {
			defer wg.Done()
			log.InfoNoFileInfo("receive consume topic %s, partition %d", t, p)
			for {
				select {
				case message := <-pClient.Messages():
					if b.IsDebug() {
						log.DebugNoFileInfo("Message claimed:  timestamp = %v, topic = %s, partition=%d, offset=%d, message = %s,", message.Timestamp, message.Topic, message.Partition, message.Offset, string(message.Value))
					}
					cpool := c.messagePool.Get().(*ConsumerMessageContext)
					cpool.builder = c.builder
					cpool.groupId = c.groupId
					cpool.topic = message.Topic
					cpool.partition = message.Partition
					cpool.offset = message.Offset
					cpool.message = message.Value
					cpool.timeStamp = message.Timestamp
					cpool.val = message
					cpool.consumerSession = NewConsumerSession(nil, message, b.IsAutoCommit())
					vc := *cpool
					cpool.reset()
					c.messagePool.Put(cpool)
					if c != nil {
						c.responseListener(&vc)
					}
				}
			}
		}(pc, topic, v, _c, buildConsumer)
	}
	wg.Wait()
	return nil
}

//创建build消费任务
func _consumerItemGroup(num int, factoryConsumer *factoryConsumer, buildConsumer BuildConsumerApi) error {
	keepRunning := true
	consumptionIsPaused := false
	ctx, cancel := context.WithCancel(context.Background())
	broker := strings.Split(buildConsumer.GetAddr(), ",")
	groupId := buildConsumer.GetGroupId()
	topics := buildConsumer.GetTopic()
	conf := buildConsumer.GetConfig()
	responseListen := buildConsumer.GetResponseListener()
	conf.Consumer.Retry.BackoffFunc = func(retries int) time.Duration {
		log.InfoNoFileInfo("disconnection reconnects [%d]", retries)
		return time.Second
	}
	log.InfoNoFileInfo("starting consumer [index:%d, groupId:%s, topics:%s]", num, groupId, topics)

	ccGroup, err := sarama.NewConsumerGroup(broker, groupId, conf)
	if err != nil {
		log.ErrorNoFileInfo("can not be connection server")
		cancel()
		return err
	}
	_c := newConsumer()
	_c.SetListener(responseListen)
	_c.SetGroupId(groupId)
	_c.SetBuilder(buildConsumer)
	wg := &sync.WaitGroup{}
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			if errs := ccGroup.Consume(ctx, []string{topics}, _c); errs != nil {
				log.Panicf("Error from consumer: %v", errs)
			}
			if ctx.Err() != nil {
				break
			}
			_c.ready = make(chan bool)
		}
	}()
	<-_c.ready
	log.InfoNoFileInfo("wait for receiving messages [num:%d, groupId:%s, topic:%s]", num, groupId, topics)
	sigusr1 := make(chan os.Signal, 1)
	signal.Notify(sigusr1, syscall.SIGHUP)
	for keepRunning {
		select {
		case <-ctx.Done():
			log.ErrorNoFileInfo("terminating: context cancelled")
			keepRunning = false
		case <-sigusr1:
			_toggleConsumptionFlow(ccGroup, &consumptionIsPaused)
		}
	}
	cancel()
	wg.Wait()
	if err = ccGroup.Close(); err != nil {
		log.Panicf("Error closing client: %v", err)
		return err
	}
	return nil
}

func _toggleConsumptionFlow(client sarama.ConsumerGroup, isPaused *bool) {
	if *isPaused {
		log.InfoNoFileInfo("Resuming consumption")
		client.ResumeAll()

	} else {
		log.InfoNoFileInfo("Pausing consumption")
		client.PauseAll()
	}

	*isPaused = !*isPaused
}

type consumer struct {
	builder          BuildConsumerApi
	groupId          string
	ready            chan bool
	responseListener ConsumerResponseListener
	messagePool      sync.Pool
}

func newConsumer() *consumer {
	_c := &consumer{
		ready: make(chan bool),
	}
	_c.messagePool.New = func() any {
		return &ConsumerMessageContext{}
	}
	return _c
}

func (this *consumer) SetBuilder(builder BuildConsumerApi) {
	this.builder = builder
}

func (this *consumer) SetListener(listener ConsumerResponseListener) {
	this.responseListener = listener
}
func (this *consumer) SetGroupId(groupId string) {
	this.groupId = groupId
}
func (this *consumer) Setup(sarama.ConsumerGroupSession) error {
	close(this.ready)
	return nil
}

func (this *consumer) Cleanup(sarama.ConsumerGroupSession) error {
	return nil
}

func (this *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for {
		select {
		case message := <-claim.Messages():
			if this.builder.IsDebug() {
				log.DebugNoFileInfo("Message claimed: value = %s, timestamp = %v, topic = %s\n", string(message.Value), message.Timestamp, message.Topic)
			}
			cpool := this.messagePool.Get().(*ConsumerMessageContext)
			cpool.builder = this.builder
			cpool.groupId = this.groupId
			cpool.topic = message.Topic
			cpool.partition = message.Partition
			cpool.offset = message.Offset
			cpool.message = message.Value
			cpool.timeStamp = message.Timestamp
			cpool.val = message

			cpool.consumerSession = NewConsumerSession(session, message, this.builder.IsAutoCommit())
			vc := *cpool
			cpool.reset()
			this.messagePool.Put(cpool)
			if this.responseListener != nil {
				this.responseListener(&vc)
			}
		case <-session.Context().Done():
			log.ErrorNoFileInfo("Connection closure, waiting for reconnection")
			return nil
		}
	}
}
